{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# **ATL06 to ATL11**\n",
    "\n",
    "Converting the ICESat-2 ATL06 (Land Ice Height) product to ATL11 (Land Ice Height Changes).\n",
    "Also convert the ATL11 file format from HDF5 to [Zarr](https://zarr.readthedocs.io/)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import glob\n",
    "import sys\n",
    "import subprocess\n",
    "\n",
    "import dask\n",
    "import dask.distributed\n",
    "import h5py\n",
    "import intake\n",
    "import itertools\n",
    "import matplotlib.pyplot as plt\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "import pyproj\n",
    "import tqdm\n",
    "import xarray as xr\n",
    "import zarr\n",
    "\n",
    "os.environ[\"HDF5_USE_FILE_LOCKING\"] = \"FALSE\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {
    "lines_to_next_cell": 1
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<table style=\"border: 2px solid white;\">\n",
       "<tr>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Client</h3>\n",
       "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
       "  <li><b>Scheduler: </b>tcp://127.0.0.1:37523</li>\n",
       "  <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a></li>\n",
       "</ul>\n",
       "</td>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Cluster</h3>\n",
       "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
       "  <li><b>Workers: </b>72</li>\n",
       "  <li><b>Cores: </b>72</li>\n",
       "  <li><b>Memory: </b>201.22 GB</li>\n",
       "</ul>\n",
       "</td>\n",
       "</tr>\n",
       "</table>"
      ],
      "text/plain": [
       "<Client: 'tcp://127.0.0.1:37523' processes=72 threads=72, memory=201.22 GB>"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "client = dask.distributed.Client(n_workers=72, threads_per_worker=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def first_last_cycle_numbers(referencegroundtrack: int, orbitalsegment: int):\n",
    "    \"\"\"\n",
    "    Obtain the first and last cycle numbers for an ATL06 track, given the\n",
    "    reference ground track and orbital segment number as input.\n",
    "    \"\"\"\n",
    "    files = glob.glob(\n",
    "        f\"ATL06.003/**/ATL06*_*_{referencegroundtrack:04d}??{orbitalsegment:02d}_*.h5\"\n",
    "    )\n",
    "\n",
    "    first_cycle = min(files)[-14:-12]  # e.g. '02'\n",
    "    last_cycle = max(files)[-14:-12]  # e.g. '07'\n",
    "\n",
    "    return first_cycle, last_cycle"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "lines_to_next_cell": 2
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 4161/4161 [04:23<00:00, 15.77it/s]\n"
     ]
    }
   ],
   "source": [
    "# Create ATL06_to_ATL11 processing script, if not already present\n",
    "if not os.path.exists(\"ATL06_to_ATL11_Antarctica.sh\"):\n",
    "    # find first and last cycles for each reference ground track and each orbital segment\n",
    "    futures = []\n",
    "    for referencegroundtrack in range(1387, 0, -1):\n",
    "        for orbitalsegment in [10, 11, 12]:  # loop through Antarctic orbital segments\n",
    "            cyclenums = client.submit(\n",
    "                first_last_cycle_numbers,\n",
    "                referencegroundtrack,\n",
    "                orbitalsegment,\n",
    "                key=f\"{referencegroundtrack:04d}-{orbitalsegment}\",\n",
    "            )\n",
    "            futures.append(cyclenums)\n",
    "\n",
    "    # Prepare string to write into ATL06_to_ATL11_Antarctica.sh bash script\n",
    "    writelines = []\n",
    "    for f in tqdm.tqdm(\n",
    "        iterable=dask.distributed.as_completed(futures=futures), total=len(futures)\n",
    "    ):\n",
    "        referencegroundtrack, orbitalsegment = f.key.split(\"-\")\n",
    "        first_cycle, last_cycle = f.result()\n",
    "        writelines.append(\n",
    "            f\"python3 ATL11/ATL06_to_ATL11.py\"\n",
    "            f\" {referencegroundtrack} {orbitalsegment}\"\n",
    "            f\" --cycles {first_cycle} {last_cycle}\"\n",
    "            f\" --Release 3\"\n",
    "            f\" --directory 'ATL06.003/**/'\"\n",
    "            f\" --out_dir ATL11.001\\n\"\n",
    "        )\n",
    "    writelines.sort()  # sort writelines in place\n",
    "\n",
    "    # Finally create the bash script\n",
    "    with open(file=\"ATL06_to_ATL11_Antarctica.sh\", mode=\"w\") as f:\n",
    "        f.writelines(writelines)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now use [GNU parallel](https://www.gnu.org/software/parallel/parallel_tutorial.html) to run the script in parallel.\n",
    "Will take about 1 week to run on 64 cores.\n",
    "\n",
    "Reference:\n",
    "\n",
    "- O. Tange (2018): GNU Parallel 2018, Mar 2018, ISBN 9781387509881, DOI https://doi.org/10.5281/zenodo.1146014"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[7m68% 1942:896=1d07h16m32s python3 ATL11/ATL06_to_ATL11.\u001b[0mpy 1112 10 --cycles 01 07 \u001b[0m"
     ]
    }
   ],
   "source": [
    "# !PYTHONPATH=`pwd` PYTHONWARNINGS=\"ignore\" parallel -a ATL06_to_ATL11_Antarctica.sh --bar --resume-failed --results logdir --joblog log --jobs 80 --load 100% > /dev/null\n",
    "# !PYTHONPATH=`pwd` PYTHONWARNINGS=\"ignore\" parallel -a ATL06_to_ATL11_Antarctica_20200513_20200716_diff.sh --bar --resume-failed --results logdir --joblog log --jobs 80 --load 100% > /dev/null"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "# df_log = pd.read_csv(filepath_or_buffer=\"log\", sep=\"\\t\")\n",
    "# df_log.query(expr=\"Exitval > 0\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Convert from HDF5 to Zarr format\n",
    "\n",
    "For faster data access speeds!\n",
    "We'll collect the data for each Reference Ground Track,\n",
    "and store it inside a Zarr format,\n",
    "specifically one that can be used by xarray.\n",
    "See also http://xarray.pydata.org/en/v0.15.1/io.html#zarr.\n",
    "\n",
    "Grouping hierarchy:\n",
    "  - Reference Ground Track (1-1387)\n",
    "    - Orbital Segments (10, 11, 12)\n",
    "      - Laser Pairs (pt1, pt2, pt3)\n",
    "        - Attributes (longitude, latitude, h_corr, delta_time, etc)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "lines_to_next_cell": 1
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "8 ICESat-2 cycles available\n"
     ]
    }
   ],
   "source": [
    "# for atl11file in tqdm.tqdm(iterable=sorted(glob.glob(\"ATL11.001/*.h5\"))):\n",
    "#     name = os.path.basename(p=os.path.splitext(p=atl11file)[0])\n",
    "\n",
    "max_cycles: int = max(int(f[-11:-10]) for f in glob.glob(\"ATL11.001/*.h5\"))\n",
    "print(f\"{max_cycles} ICESat-2 cycles available\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dask.delayed\n",
    "def open_ATL11(atl11file: str, group: str) -> xr.Dataset:\n",
    "    \"\"\"\n",
    "    Opens up an ATL11 file using xarray and does some light pre-processing:\n",
    "    - Mask values using _FillValue ??\n",
    "    - Convert attribute format from binary to str\n",
    "    \"\"\"\n",
    "    ds = xr.open_dataset(\n",
    "        filename_or_obj=atl11file, group=group, engine=\"h5netcdf\", mask_and_scale=True\n",
    "    )\n",
    "\n",
    "    # Change xarray.Dataset attributes from binary to str type\n",
    "    # fixes issue when saving to Zarr format later\n",
    "    # TypeError: Object of type bytes is not JSON serializable\n",
    "    for key, variable in ds.variables.items():\n",
    "        assert isinstance(ds[key].DIMENSION_LABELS, np.ndarray)\n",
    "        ds[key].attrs[\"DIMENSION_LABELS\"] = (\n",
    "            ds[key].attrs[\"DIMENSION_LABELS\"].astype(str)\n",
    "        )\n",
    "    try:\n",
    "        ds.attrs[\"ATL06_xover_field_list\"] = ds.attrs[\"ATL06_xover_field_list\"].astype(\n",
    "            str\n",
    "        )\n",
    "    except KeyError:\n",
    "        pass\n",
    "\n",
    "    return ds"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "lines_to_next_cell": 2
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 1387/1387 [00:10<00:00, 127.39it/s]\n"
     ]
    }
   ],
   "source": [
    "# Consolidate together Antarctic orbital segments 10, 11, 12 into one file\n",
    "# Also consolidate all three laser pairs pt1, pt2, pt3 into one file\n",
    "atl11_dict = {}\n",
    "for rgt in tqdm.trange(1387):\n",
    "    atl11files: list = glob.glob(f\"ATL11.001/ATL11_{rgt+1:04d}1?_????_00?_0?.h5\")\n",
    "\n",
    "    try:\n",
    "        assert len(atl11files) == 3  # Should be 3 files for Orbital Segments 10,11,12\n",
    "    except AssertionError:\n",
    "        # Manually handle exceptional cases\n",
    "        if len(atl11files) != 2 or rgt + 1 not in [1036]:\n",
    "            raise ValueError(\n",
    "                f\"{rgt+1} only has {len(atl11files)} ATL11 files instead of 3\"\n",
    "            )\n",
    "\n",
    "    if atl11files:\n",
    "        pattern: dict = intake.source.utils.reverse_format(\n",
    "            format_string=\"ATL11.001/ATL11_{referencegroundtrack:4}{orbitalsegment:2}_{cycles:4}_{version:3}_{revision:2}.h5\",\n",
    "            resolved_string=sorted(atl11files)[1],  # get the '11' one, not '10' or '12'\n",
    "        )\n",
    "        zarrfilepath: str = \"ATL11.001z123/ATL11_{referencegroundtrack}1x_{cycles}_{version}_{revision}.zarr\".format(\n",
    "            **pattern\n",
    "        )\n",
    "        atl11_dict[zarrfilepath] = atl11files"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get proper data encoding from a sample ATL11 file\n",
    "atl11file: str = atl11files[0]\n",
    "root_ds = open_ATL11(atl11file=atl11file, group=\"pt2\").compute()\n",
    "reference_surface_ds = open_ATL11(atl11file=atl11file, group=\"pt2/ref_surf\").compute()\n",
    "ds: xr.Dataset = xr.combine_by_coords(datasets=[root_ds, reference_surface_ds])\n",
    "\n",
    "# Convert variables to correct datatype\n",
    "encoding: dict = {}\n",
    "df: pd.DataFrame = pd.read_csv(\"ATL11/ATL11_output_attrs.csv\")[[\"field\", \"datatype\"]]\n",
    "df = df.set_index(\"field\")\n",
    "for var in ds.variables:\n",
    "    desired_dtype = str(df.datatype[var]).lower()\n",
    "    if ds[var].dtype.name != desired_dtype:\n",
    "        try:\n",
    "            desired_dtype = desired_dtype.split(var)[1].strip()\n",
    "        except IndexError:\n",
    "            pass\n",
    "    encoding[var] = {\"dtype\": desired_dtype}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 1387/1387 [00:30<00:00, 45.77it/s]\n"
     ]
    }
   ],
   "source": [
    "# Gather up all the dask.delayed conversion tasks to store data into Zarr!\n",
    "stores = []\n",
    "for zarrfilepath, atl11files in tqdm.tqdm(iterable=atl11_dict.items()):\n",
    "    zarr.open(store=zarrfilepath, mode=\"w\")  # Make a new file/overwrite existing\n",
    "    datasets = []\n",
    "    for atl11file in atl11files:  # Orbital Segments: 10, 11, 12\n",
    "        for pair in (\"pt1\", \"pt2\", \"pt3\"):  # Laser pairs: pt1, pt2, pt3\n",
    "            # Attributes: longitude, latitude, h_corr, delta_time, etc\n",
    "            root_ds = open_ATL11(atl11file=atl11file, group=pair)\n",
    "            reference_surface_ds = open_ATL11(\n",
    "                atl11file=atl11file, group=f\"{pair}/ref_surf\"\n",
    "            )\n",
    "            ds = dask.delayed(obj=xr.combine_by_coords)(\n",
    "                datasets=[root_ds, reference_surface_ds]\n",
    "            )\n",
    "            datasets.append(ds)\n",
    "\n",
    "    dataset = dask.delayed(obj=xr.concat)(objs=datasets, dim=\"ref_pt\")\n",
    "    store_task = dataset.to_zarr(\n",
    "        store=zarrfilepath, mode=\"w\", encoding=encoding, consolidated=True\n",
    "    )\n",
    "    stores.append(store_task)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "100%|██████████| 1387/1387 [26:06<00:00,  1.13s/it]\n"
     ]
    }
   ],
   "source": [
    "# Do all the HDF5 to Zarr conversion! Should take about half an hour to run\n",
    "# Check conversion progress here, https://stackoverflow.com/a/37901797/6611055\n",
    "futures = [client.compute(store_task) for store_task in stores]\n",
    "for _ in tqdm.tqdm(\n",
    "    iterable=dask.distributed.as_completed(futures=futures), total=len(stores)\n",
    "):\n",
    "    pass"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "lines_to_next_cell": 2
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "(185396, 7)"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "ds = xr.open_dataset(zarrfilepath, engine=\"zarr\", backend_kwargs={\"consolidated\": True})\n",
    "ds.h_corr.__array__().shape"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {
    "lines_to_next_cell": 2
   },
   "source": [
    "# Note, this raw conversion below takes about 11 hours\n",
    "# because HDF5 files work on a single thread...\n",
    "for atl11file in tqdm.tqdm(iterable=sorted(glob.glob(\"ATL11.001/*.h5\"))):\n",
    "    name = os.path.basename(p=os.path.splitext(p=atl11file)[0])\n",
    "    zarr.convenience.copy_all(\n",
    "        source=h5py.File(name=atl11file, mode=\"r\"),\n",
    "        dest=zarr.open_group(store=f\"ATL11.001z/{name}.zarr\", mode=\"w\"),\n",
    "        if_exists=\"skip\",\n",
    "        without_attrs=True,\n",
    "    )"
   ]
  }
 ],
 "metadata": {
  "jupytext": {
   "cell_metadata_filter": "title,-all",
   "formats": "ipynb,py:hydrogen"
  },
  "kernelspec": {
   "display_name": "deepicedrain",
   "language": "python",
   "name": "deepicedrain"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
